package org.apache.solr.cloud;

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestRecovery;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.component.HttpShardHandlerFactory;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.handler.component.ShardResponse;
import org.apache.solr.update.PeerSync;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SyncStrategy {
	protected final Logger log = LoggerFactory.getLogger(getClass());

	private final boolean SKIP_AUTO_RECOVERY = Boolean
			.getBoolean("solrcloud.skip.autorecovery");

	private final ShardHandler shardHandler;

	private ThreadPoolExecutor recoveryCmdExecutor = new ThreadPoolExecutor(0,
			Integer.MAX_VALUE, 5, TimeUnit.SECONDS,
			new SynchronousQueue<Runnable>(), new DefaultSolrThreadFactory(
					"recoveryCmdExecutor"));

	private volatile boolean isClosed;

	private final static HttpClient client;
	static {
		ModifiableSolrParams params = new ModifiableSolrParams();
		params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 10000);
		params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 20);
		params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, 30000);
		params.set(HttpClientUtil.PROP_SO_TIMEOUT, 30000);
		params.set(HttpClientUtil.PROP_USE_RETRY, false);
		client = HttpClientUtil.createClient(params);
	}

	public SyncStrategy() {
		shardHandler = new HttpShardHandlerFactory().getShardHandler(client);
	}

	private static class ShardCoreRequest extends ShardRequest {
		String coreName;
		public String baseUrl;
	}

	public boolean sync(ZkController zkController, SolrCore core,
			ZkNodeProps leaderProps) {
		if (SKIP_AUTO_RECOVERY) {
			return true;
		}
		log.info("Sync replicas to " + ZkCoreNodeProps.getCoreUrl(leaderProps));
		// TODO: look at our state usage of sync
		// zkController.publish(core, ZkStateReader.SYNC);

		// solrcloud_debug
		// System.out.println("SYNC UP");
		if (core.getUpdateHandler().getUpdateLog() == null) {
			log.error("No UpdateLog found - cannot sync");
			return false;
		}
		boolean success = syncReplicas(zkController, core, leaderProps);
		return success;
	}

	private boolean syncReplicas(ZkController zkController, SolrCore core,
			ZkNodeProps leaderProps) {
		boolean success = false;
		CloudDescriptor cloudDesc = core.getCoreDescriptor()
				.getCloudDescriptor();
		String collection = cloudDesc.getCollectionName();
		String shardId = cloudDesc.getShardId();

		if (isClosed) {
			log.info("We have been closed, won't sync with replicas");
			return false;
		}
		// if no one that is up is active, we are willing to wait...
		// we don't want a recovering node to become leader and then
		// a better candidate pops up a second later.
		// int tries = 20;
		// while (!areAnyReplicasActive(zkController, collection, shardId)) {
		// if (tries-- == 0) {
		// break;
		// }
		// try {
		// Thread.sleep(500);
		// } catch (InterruptedException e) {
		// Thread.currentThread().interrupt();
		// }
		// }

		// first sync ourselves - we are the potential leader after all
		try {
			success = syncWithReplicas(zkController, core, leaderProps,
					collection, shardId);
		} catch (Exception e) {
			SolrException.log(log, "Sync Failed", e);
		}
		try {
			if (isClosed) {
				log.info("We have been closed, won't attempt to sync replicas back to leader");
				return false;
			}

			if (success) {
				log.info("Sync Success - now sync replicas to me");

				syncToMe(zkController, collection, shardId, leaderProps);

			} else {
				SolrException.log(log, "Sync Failed");
				// lets see who seems ahead...
			}

		} catch (Exception e) {
			SolrException.log(log, "Sync Failed", e);
		}

		return success;
	}

	private boolean syncWithReplicas(ZkController zkController, SolrCore core,
			ZkNodeProps props, String collection, String shardId) {
		List<ZkCoreNodeProps> nodes = zkController.getZkStateReader()
				.getReplicaProps(collection, shardId,
						props.getStr(ZkStateReader.NODE_NAME_PROP),
						props.getStr(ZkStateReader.CORE_NAME_PROP));

		if (nodes == null) {
			// I have no replicas
			return true;
		}

		List<String> syncWith = new ArrayList<String>();
		for (ZkCoreNodeProps node : nodes) {
			syncWith.add(node.getCoreUrl());
		}

		// if we can't reach a replica for sync, we still consider the overall
		// sync a success
		// TODO: as an assurance, we should still try and tell the sync nodes
		// that we couldn't reach
		// to recover once more?
		PeerSync peerSync = new PeerSync(core, syncWith, core
				.getUpdateHandler().getUpdateLog().numRecordsToKeep, true);
		return peerSync.sync();
	}

	private void syncToMe(ZkController zkController, String collection,
			String shardId, ZkNodeProps leaderProps) {

		// sync everyone else
		// TODO: we should do this in parallel at least
		List<ZkCoreNodeProps> nodes = zkController.getZkStateReader()
				.getReplicaProps(collection, shardId,
						leaderProps.getStr(ZkStateReader.NODE_NAME_PROP),
						leaderProps.getStr(ZkStateReader.CORE_NAME_PROP));
		if (nodes == null) {
			log.info(ZkCoreNodeProps.getCoreUrl(leaderProps)
					+ " has no replicas");
			return;
		}

		ZkCoreNodeProps zkLeader = new ZkCoreNodeProps(leaderProps);
		for (ZkCoreNodeProps node : nodes) {
			try {
				log.info(ZkCoreNodeProps.getCoreUrl(leaderProps)
						+ ": try and ask " + node.getCoreUrl() + " to sync");

				requestSync(node.getBaseUrl(), node.getCoreUrl(),
						zkLeader.getCoreUrl(), node.getCoreName());

			} catch (Exception e) {
				SolrException.log(log, "Error syncing replica to leader", e);
			}
		}

		for (;;) {
			ShardResponse srsp = shardHandler.takeCompletedOrError();
			if (srsp == null)
				break;
			boolean success = handleResponse(srsp);
			if (srsp.getException() != null) {
				SolrException.log(log,
						"Sync request error: " + srsp.getException());
			}

			if (!success) {
				try {
					log.info(ZkCoreNodeProps.getCoreUrl(leaderProps)
							+ ": Sync failed - asking replica ("
							+ srsp.getShardAddress() + ") to recover.");
					if (isClosed) {
						log.info("We have been closed, don't request that a replica recover");
					} else {
						requestRecovery(
								leaderProps,
								((ShardCoreRequest) srsp.getShardRequest()).baseUrl,
								((ShardCoreRequest) srsp.getShardRequest()).coreName);
					}
				} catch (Throwable t) {
					SolrException.log(log,
							ZkCoreNodeProps.getCoreUrl(leaderProps)
									+ ": Could not tell a replica to recover",
							t);
				}
			} else {
				log.info(ZkCoreNodeProps.getCoreUrl(leaderProps) + ": "
						+ " sync completed with " + srsp.getShardAddress());
			}

		}

	}

	private boolean handleResponse(ShardResponse srsp) {
		NamedList<Object> response = srsp.getSolrResponse().getResponse();
		// TODO: why does this return null sometimes?
		if (response == null) {
			return false;
		}
		Boolean success = (Boolean) response.get("sync");

		if (success == null) {
			success = false;
		}

		return success;
	}

	private void requestSync(String baseUrl, String replica, String leaderUrl,
			String coreName) {
		ShardCoreRequest sreq = new ShardCoreRequest();
		sreq.coreName = coreName;
		sreq.baseUrl = baseUrl;
		sreq.purpose = 1;
		// TODO: this sucks
		if (replica.startsWith("http://"))
			replica = replica.substring(7);
		sreq.shards = new String[] { replica };
		sreq.actualShards = sreq.shards;
		sreq.params = new ModifiableSolrParams();
		sreq.params.set("qt", "/get");
		sreq.params.set("distrib", false);
		sreq.params.set("getVersions", Integer.toString(100));
		sreq.params.set("sync", leaderUrl);

		shardHandler.submit(sreq, replica, sreq.params);
	}

	public void close() {
		this.isClosed = true;
		ExecutorUtil.shutdownNowAndAwaitTermination(recoveryCmdExecutor);
	}

	private void requestRecovery(final ZkNodeProps leaderProps,
			final String baseUrl, final String coreName)
			throws SolrServerException, IOException {
		// TODO: do this in background threads
		Thread thread = new Thread() {
			{
				setDaemon(true);
			}

			@Override
			public void run() {
				RequestRecovery recoverRequestCmd = new RequestRecovery();
				recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
				recoverRequestCmd.setCoreName(coreName);

				HttpSolrServer server = new HttpSolrServer(baseUrl);
				server.setConnectionTimeout(45000);
				server.setSoTimeout(45000);
				try {
					server.request(recoverRequestCmd);
				} catch (Throwable t) {
					SolrException.log(log,
							ZkCoreNodeProps.getCoreUrl(leaderProps)
									+ ": Could not tell a replica to recover",
							t);
				}
			}
		};
		recoveryCmdExecutor.execute(thread);
	}

	public static ModifiableSolrParams params(String... params) {
		ModifiableSolrParams msp = new ModifiableSolrParams();
		for (int i = 0; i < params.length; i += 2) {
			msp.add(params[i], params[i + 1]);
		}
		return msp;
	}
}
